/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.util;

import io.trino.hadoop.TextLineLengthLimitExceededException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;

/**
 * A class that provides a line reader from an input stream.
 * Depending on the constructor used, lines will either be terminated by:
 * <ul>
 * <li>one of the following: '\n' (LF) , '\r' (CR),
 * or '\r\n' (CR+LF).</li>
 * <li><em>or</em>, a custom byte sequence delimiter</li>
 * </ul>
 * In both cases, EOF also terminates an otherwise unterminated
 * line.
 */
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class LineReader
        implements Closeable
{
    // Limitation for array size is VM specific. Current HotSpot VM limitation
    // for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5).
    // Integer.MAX_VALUE - 8 should be safe enough.
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
    private static final byte CR = '\r';
    private static final byte LF = '\n';
    // The line delimiter
    private final byte[] recordDelimiterBytes;
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    private InputStream in;
    private byte[] buffer;
    // the number of bytes of real data in the buffer
    private int bufferLength = 0;
    // the current position in the buffer
    private int bufferPosn = 0;

    /**
     * Create a line reader that reads from the given stream using the
     * default buffer-size (64k).
     * @param in The input stream
     * @throws IOException
     */
    public LineReader(InputStream in)
    {
        this(in, DEFAULT_BUFFER_SIZE);
    }

    /**
     * Create a line reader that reads from the given stream using the
     * given buffer-size.
     * @param in The input stream
     * @param bufferSize Size of the read buffer
     * @throws IOException
     */
    public LineReader(InputStream in, int bufferSize)
    {
        this.in = in;
        this.bufferSize = bufferSize;
        this.buffer = new byte[this.bufferSize];
        this.recordDelimiterBytes = null;
    }

    /**
     * Create a line reader that reads from the given stream using the
     * <code>io.file.buffer.size</code> specified in the given
     * <code>Configuration</code>.
     * @param in input stream
     * @param conf configuration
     * @throws IOException
     */
    public LineReader(InputStream in, Configuration conf)
            throws IOException
    {
        this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
    }

    /**
     * Create a line reader that reads from the given stream using the
     * default buffer-size, and using a custom delimiter of array of
     * bytes.
     * @param in The input stream
     * @param recordDelimiterBytes The delimiter
     */
    public LineReader(InputStream in, byte[] recordDelimiterBytes)
    {
        this.in = in;
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        this.buffer = new byte[this.bufferSize];
        this.recordDelimiterBytes = recordDelimiterBytes;
    }

    /**
     * Create a line reader that reads from the given stream using the
     * given buffer-size, and using a custom delimiter of array of
     * bytes.
     * @param in The input stream
     * @param bufferSize Size of the read buffer
     * @param recordDelimiterBytes The delimiter
     * @throws IOException
     */
    public LineReader(InputStream in, int bufferSize,
            byte[] recordDelimiterBytes)
    {
        this.in = in;
        this.bufferSize = bufferSize;
        this.buffer = new byte[this.bufferSize];
        this.recordDelimiterBytes = recordDelimiterBytes;
    }

    /**
     * Create a line reader that reads from the given stream using the
     * <code>io.file.buffer.size</code> specified in the given
     * <code>Configuration</code>, and using a custom delimiter of array of
     * bytes.
     * @param in input stream
     * @param conf configuration
     * @param recordDelimiterBytes The delimiter
     * @throws IOException
     */
    public LineReader(InputStream in, Configuration conf,
            byte[] recordDelimiterBytes)
            throws IOException
    {
        this.in = in;
        this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
        this.buffer = new byte[this.bufferSize];
        this.recordDelimiterBytes = recordDelimiterBytes;
    }

    /**
     * Close the underlying stream.
     * @throws IOException
     */
    public void close()
            throws IOException
    {
        in.close();
    }

    /**
     * Read one line from the InputStream into the given Text.
     *
     * @param str the object to store the given line (without newline)
     * @param maxLineLength the maximum number of bytes to store into str;
     *  the rest of the line is silently discarded.
     * @param maxBytesToConsume the maximum number of bytes to consume
     *  in this call.  This is only a hint, because if the line cross
     *  this threshold, we allow it to happen.  It can overshoot
     *  potentially by as much as one buffer length.
     *
     * @return the number of bytes read including the (longest) newline
     * found.
     *
     * @throws IOException if the underlying stream throws
     */
    public int readLine(Text str, int maxLineLength,
            int maxBytesToConsume)
            throws IOException
    {
        maxLineLength = Math.min(maxLineLength, MAX_ARRAY_SIZE);
        maxBytesToConsume = Math.min(maxBytesToConsume, MAX_ARRAY_SIZE);
        if (this.recordDelimiterBytes != null) {
            return readCustomLine(str, maxLineLength, maxBytesToConsume);
        }
        else {
            return readDefaultLine(str, maxLineLength, maxBytesToConsume);
        }
    }

    protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
            throws IOException
    {
        return in.read(buffer);
    }

    /**
     * Read a line terminated by one of CR, LF, or CRLF.
     */
    private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
            throws IOException
    {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
         */
        str.clear();
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
            int startPosn = bufferPosn; //starting from where we left off the last time
            if (bufferPosn >= bufferLength) {
                startPosn = bufferPosn = 0;
                if (prevCharCR) {
                    ++bytesConsumed; //account for CR from previous read
                }
                bufferLength = fillBuffer(in, buffer, prevCharCR);
                if (bufferLength <= 0) {
                    break; // EOF
                }
            }
            for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
                if (buffer[bufferPosn] == LF) {
                    newlineLength = (prevCharCR) ? 2 : 1;
                    ++bufferPosn; // at next invocation proceed from following byte
                    break;
                }
                if (prevCharCR) { //CR + notLF, we are at notLF
                    newlineLength = 1;
                    break;
                }
                prevCharCR = (buffer[bufferPosn] == CR);
            }
            int readLength = bufferPosn - startPosn;
            if (prevCharCR && newlineLength == 0) {
                --readLength; //CR at the end of the buffer
            }
            bytesConsumed += readLength;
            int appendLength = readLength - newlineLength;
            if (appendLength > maxLineLength - txtLength) {
                appendLength = maxLineLength - txtLength;
                if (appendLength > 0) {
                    // We want to fail the read when the line length is over the limit.
                    throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + maxLineLength);
                }
            }
            if (appendLength > 0) {
                int newTxtLength = txtLength + appendLength;
                if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
                    // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
                    // In such case we will throw an IOException so the caller can deal with it.
                    throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + newTxtLength);
                }
                str.append(buffer, startPosn, appendLength);
                txtLength = newTxtLength;
            }
        }
        while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

        if (newlineLength == 0 && bytesConsumed >= maxBytesToConsume) {
            // It is possible that bytesConsumed is over the maxBytesToConsume but we
            // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
            // bytes but still haven't seen a line terminator, we will fail the read.
            throw new TextLineLengthLimitExceededException("Too many bytes before newline: " + bytesConsumed);
        }
        return (int) bytesConsumed;
    }

    /**
     * Read a line terminated by a custom delimiter.
     */
    private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
            throws IOException
    {
        /* We're reading data from inputStream, but the head of the stream may be
         *  already captured in the previous buffer, so we have several cases:
         *
         * 1. The buffer tail does not contain any character sequence which
         *    matches with the head of delimiter. We count it as a
         *    ambiguous byte count = 0
         *
         * 2. The buffer tail contains a X number of characters,
         *    that forms a sequence, which matches with the
         *    head of delimiter. We count ambiguous byte count = X
         *
         *    // ***  eg: A segment of input file is as follows
         *
         *    " record 1792: I found this bug very interesting and
         *     I have completely read about it. record 1793: This bug
         *     can be solved easily record 1794: This ."
         *
         *    delimiter = "record";
         *
         *    supposing:- String at the end of buffer =
         *    "I found this bug very interesting and I have completely re"
         *    There for next buffer = "ad about it. record 179       ...."
         *
         *     The matching characters in the input
         *     buffer tail and delimiter head = "re"
         *     Therefore, ambiguous byte count = 2 ****   //
         *
         *     2.1 If the following bytes are the remaining characters of
         *         the delimiter, then we have to capture only up to the starting
         *         position of delimiter. That means, we need not include the
         *         ambiguous characters in str.
         *
         *     2.2 If the following bytes are not the remaining characters of
         *         the delimiter ( as mentioned in the example ),
         *         then we have to include the ambiguous characters in str.
         */
        str.clear();
        int txtLength = 0; // tracks str.getLength(), as an optimization
        long bytesConsumed = 0;
        int delPosn = 0;
        int ambiguousByteCount = 0; // To capture the ambiguous characters count
        do {
            int startPosn = bufferPosn; // Start from previous end position
            if (bufferPosn >= bufferLength) {
                startPosn = bufferPosn = 0;
                bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
                if (bufferLength <= 0) {
                    if (ambiguousByteCount > 0) {
                        str.append(recordDelimiterBytes, 0, ambiguousByteCount);
                        bytesConsumed += ambiguousByteCount;
                    }
                    break; // EOF
                }
            }
            for (; bufferPosn < bufferLength; ++bufferPosn) {
                if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
                    delPosn++;
                    if (delPosn >= recordDelimiterBytes.length) {
                        bufferPosn++;
                        break;
                    }
                }
                else if (delPosn != 0) {
                    bufferPosn -= delPosn;
                    if (bufferPosn < -1) {
                        bufferPosn = -1;
                    }
                    delPosn = 0;
                }
            }
            int readLength = bufferPosn - startPosn;
            bytesConsumed += readLength;
            int appendLength = readLength - delPosn;
            if (appendLength > maxLineLength - txtLength) {
                appendLength = maxLineLength - txtLength;
                if (appendLength > 0) {
                    // We want to fail the read when the line length is over the limit.
                    throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + maxLineLength);
                }
            }
            bytesConsumed += ambiguousByteCount;
            if (appendLength >= 0 && ambiguousByteCount > 0) {
                //appending the ambiguous characters (refer case 2.2)
                str.append(recordDelimiterBytes, 0, ambiguousByteCount);
                ambiguousByteCount = 0;
                // since it is now certain that the split did not split a delimiter we
                // should not read the next record: clear the flag otherwise duplicate
                // records could be generated
                unsetNeedAdditionalRecordAfterSplit();
            }
            if (appendLength > 0) {
                int newTxtLength = txtLength + appendLength;
                if (str.getBytes().length < newTxtLength && Math.max(newTxtLength, txtLength << 1) > MAX_ARRAY_SIZE) {
                    // If str need to be resized but the target capacity is over VM limit, it will trigger OOM.
                    // In such case we will throw an IOException so the caller can deal with it.
                    throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + newTxtLength);
                }
                str.append(buffer, startPosn, appendLength);
                txtLength = newTxtLength;
            }
            if (bufferPosn >= bufferLength) {
                if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
                    ambiguousByteCount = delPosn;
                    bytesConsumed -= ambiguousByteCount; //to be consumed in next
                }
            }
        }
        while (delPosn < recordDelimiterBytes.length
                && bytesConsumed < maxBytesToConsume);
        if (delPosn < recordDelimiterBytes.length
                && bytesConsumed >= maxBytesToConsume) {
            // It is possible that bytesConsumed is over the maxBytesToConsume but we
            // didn't append anything to str.bytes. If we have consumed over maxBytesToConsume
            // bytes but still haven't seen a line terminator, we will fail the read.
            throw new TextLineLengthLimitExceededException("Too many bytes before delimiter: " + bytesConsumed);
        }
        return (int) bytesConsumed;
    }

    /**
     * Read from the InputStream into the given Text.
     * @param str the object to store the given line
     * @param maxLineLength the maximum number of bytes to store into str.
     * @return the number of bytes read including the newline
     * @throws IOException if the underlying stream throws
     */
    public int readLine(Text str, int maxLineLength)
            throws IOException
    {
        return readLine(str, maxLineLength, Integer.MAX_VALUE);
    }

    /**
     * Read from the InputStream into the given Text.
     * @param str the object to store the given line
     * @return the number of bytes read including the newline
     * @throws IOException if the underlying stream throws
     */
    public int readLine(Text str)
            throws IOException
    {
        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
    }

    protected int getBufferPosn()
    {
        return bufferPosn;
    }

    protected int getBufferSize()
    {
        return bufferSize;
    }

    protected void unsetNeedAdditionalRecordAfterSplit()
    {
        // needed for custom multi byte line delimiters only
        // see MAPREDUCE-6549 for details
    }
}
